package engine

import (
	"context"
	"errors"
	"net/http"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/grafana/dskit/httpgrpc"
	"github.com/grafana/dskit/tenant"

	"github.com/grafana/loki/v3/pkg/logql"
	"github.com/grafana/loki/v3/pkg/logql/syntax"
	"github.com/grafana/loki/v3/pkg/logqlmodel"
	querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
	"github.com/grafana/loki/v3/pkg/querier/queryrange"
	"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
	utillog "github.com/grafana/loki/v3/pkg/util/log"
	util_validation "github.com/grafana/loki/v3/pkg/util/validation"
)

// Handler returns an [http.Handler] for serving queries. Unsupported queries
// will result in an error.
func Handler(cfg Config, logger log.Logger, engine *Engine, limits querier_limits.Limits) http.Handler {
	return executorHandler(cfg, logger, engine, limits)
}

// queryExecutor is an interface implemented by [Engine] for mocking in tests.
type queryExecutor interface {
	Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error)
}

func executorHandler(cfg Config, logger log.Logger, exec queryExecutor, limits querier_limits.Limits) http.Handler {
	h := &queryHandler{
		cfg:    cfg,
		logger: logger,
		exec:   exec,
		limits: limits,
	}
	return queryrange.NewSerializeHTTPHandler(h, queryrange.DefaultCodec)
}

type queryHandler struct {
	cfg    Config
	logger log.Logger
	exec   queryExecutor
	limits querier_limits.Limits
}

var _ queryrangebase.Handler = (*queryHandler)(nil)

func (h *queryHandler) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
	// TODO(rfratto): Can this be removed by making [querier.Handler] and
	// [querier.QuerierAPI] more generic?

	switch req := req.(type) {
	case *queryrange.LokiRequest:
		res, err := h.doRequest(ctx, req)
		if err != nil {
			return nil, err
		}
		params, err := queryrange.ParamsFromRequest(req)
		if err != nil {
			return nil, err
		}
		return queryrange.ResultToResponse(res, params)

	case *queryrange.LokiInstantRequest:
		res, err := h.doInstantRequest(ctx, req)
		if err != nil {
			return nil, err
		}
		params, err := queryrange.ParamsFromRequest(req)
		if err != nil {
			return nil, err
		}
		return queryrange.ResultToResponse(res, params)

	default:
		return nil, httpgrpc.Errorf(http.StatusNotImplemented, "unsupported query type %T", req)
	}
}

func (h *queryHandler) doRequest(ctx context.Context, req *queryrange.LokiRequest) (logqlmodel.Result, error) {
	logger := utillog.WithContext(ctx, h.logger)

	if err := h.validateMaxEntriesLimits(ctx, req.Plan.AST, req.Limit); err != nil {
		return logqlmodel.Result{}, err
	}

	params, err := queryrange.ParamsFromRequest(req)
	if err != nil {
		return logqlmodel.Result{}, err
	}

	return h.execute(ctx, logger, params)
}

func (h *queryHandler) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error {
	tenantIDs, err := tenant.TenantIDs(ctx)
	if err != nil {
		return httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
	}

	// entry limit does not apply to metric queries.
	if _, ok := expr.(syntax.SampleExpr); ok {
		return nil
	}

	maxEntriesCapture := func(id string) int { return h.limits.MaxEntriesLimitPerQuery(ctx, id) }
	maxEntriesLimit := util_validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, maxEntriesCapture)
	if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 {
		return httpgrpc.Errorf(http.StatusBadRequest,
			"max entries limit per query exceeded, limit > max_entries_limit_per_query (%d > %d)", limit, maxEntriesLimit)
	}
	return nil
}

func (h *queryHandler) execute(ctx context.Context, logger log.Logger, params logql.Params) (logqlmodel.Result, error) {
	if !h.validTimeRange(params) {
		return logqlmodel.Result{}, httpgrpc.Errorf(http.StatusNotImplemented, "query outside of acceptable time range")
	}

	res, err := h.exec.Execute(ctx, params)
	if err != nil && errors.Is(err, ErrNotSupported) {
		level.Warn(logger).Log("msg", "unsupported query", "err", err)
		return res, httpgrpc.Error(http.StatusNotImplemented, "unsupported query")
	} else if err != nil {
		level.Error(logger).Log("msg", "query execution failed with new query engine", "err", err)
		return res, err
	}
	return res, nil
}

// validTimeRange asserts whether the requested time range in the params are
// supported by the new engine.
func (h *queryHandler) validTimeRange(params logql.Params) bool {
	reqStart, reqEnd := params.Start(), params.End()
	validStart, validEnd := h.cfg.ValidQueryRange()

	return reqEnd.Before(validEnd) && reqStart.After(validStart)
}

func (h *queryHandler) doInstantRequest(ctx context.Context, req *queryrange.LokiInstantRequest) (logqlmodel.Result, error) {
	logger := utillog.WithContext(ctx, h.logger)

	// Do not allow log selector expression (aka log query) as instant query.
	if _, ok := req.Plan.AST.(syntax.SampleExpr); !ok {
		return logqlmodel.Result{}, logqlmodel.ErrUnsupportedSyntaxForInstantQuery
	}

	if err := h.validateMaxEntriesLimits(ctx, req.Plan.AST, req.Limit); err != nil {
		return logqlmodel.Result{}, err
	}

	params, err := queryrange.ParamsFromRequest(req)
	if err != nil {
		return logqlmodel.Result{}, err
	}
	return h.execute(ctx, logger, params)
}
